package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.DimAsyncFunction;
import com.atguigu.gmall.realtime.bean.OrderDetail;
import com.atguigu.gmall.realtime.bean.OrderInfo;
import com.atguigu.gmall.realtime.bean.OrderWide;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import com.ibm.icu.text.SimpleDateFormat;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.util.Date;
import java.util.concurrent.TimeUnit;

//准备订单宽表数据
public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 设置流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //TODO 2 从kafka中读取数据
        //2.1 读取订单信息
        String orderInfoTopic="dwd_order_info";
        //2.2 消费订单组
        String groupId="OrderWideApp";
        //2.3 获取kafka输入源
        FlinkKafkaConsumer<String> orderInfoSource = MyKafka.getFlinkKafkaConsumer(orderInfoTopic, groupId);
        //2.4 获取订单信息流
        DataStreamSource<String> orderInfoStrDStream = env.addSource(orderInfoSource);

        //3.1  订单详情topic
        String orderDetailsTopic="dwd_order_detail";
        //3.2 获取详情输入源
        FlinkKafkaConsumer<String> orderDetailsSource = MyKafka.getFlinkKafkaConsumer(orderDetailsTopic, groupId);
        //3.3获取订单详情流
        DataStreamSource<String> orderDetailStrDStream = env.addSource(orderDetailsSource);

        //3.4 测试流数据输出
       // orderInfoStrDStream.print("订单信息流测试>>>>>>");
        //orderDetailStrDStream.print("订单详情流测试>>>>");

        //TODO 3 对获得的流的数据进行转换.
        //3.1 将流数据转换为 实体类对象,订单信息对象
        SingleOutputStreamOperator<OrderInfo> orderInfoDStream = orderInfoStrDStream.map(
                new RichMapFunction<String, OrderInfo>() {
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    }

                    @Override
                    public OrderInfo map(String value) throws Exception {
                        //将数据转换为orderInfo对象
                        OrderInfo orderInfo = JSONObject.parseObject(value, OrderInfo.class);
                        //并且将时间给格式化为时间戳
                        orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());

                        return orderInfo;
                    }
                }
        );

        //3.2 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailDStream = orderDetailStrDStream.map(
                new RichMapFunction<String, OrderDetail>() {
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    }

                    @Override
                    public OrderDetail map(String value) throws Exception {
                        //将订单详情流转换为OrderDetail流
                        OrderDetail orderDetail = JSONObject.parseObject(value, OrderDetail.class);
                        //转换时间戳，作为后面的水位线
                        orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
                        return orderDetail;
                    }
                }
        );


        //TODO 4 指定数据的水位线Watermark
        //4.1 设置订单信息的水位线
        SingleOutputStreamOperator<OrderInfo> orderInfoWithWaterMarkDStream = orderInfoDStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderInfo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                            @Override
                            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                                return orderInfo.getCreate_ts();
                            }
                        })
        );

        //4.2 设置订单详情的水位线
        SingleOutputStreamOperator<OrderDetail> orderDetailWithWaterMarkDStream = orderDetailDStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderDetail>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                            @Override
                            public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                                return orderDetail.getCreate_ts();
                            }
                        })
        );

        // TODO 5 对数据进行分组
        //5.1 订单信息
        KeyedStream<OrderInfo, Long> orderInfoLongKeyedStream = orderInfoWithWaterMarkDStream.keyBy(r -> r.getId());
        //5.2 订单详情
        KeyedStream<OrderDetail, Long> orderDetailLongKeyedStream = orderDetailWithWaterMarkDStream.keyBy(r -> r.getOrder_id());

        //TODO 6 合流 join
        SingleOutputStreamOperator<OrderWide> orderWideDStream = orderInfoLongKeyedStream.intervalJoin(orderDetailLongKeyedStream)
                .between(Time.seconds(-5), Time.seconds(5))
                .process(
                        new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                            @Override
                            public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                                out.collect(new OrderWide(orderInfo, orderDetail));
                            }
                        }
                );


        //orderWideDStream.print("宽表>>>>>>>");

        //TODO 7 应用异步IO,用户进行维度关联,通过 AsyncDataStream.unorderedWait
        //unorderedWait,不等返回后顺序排序，发送下游
        SingleOutputStreamOperator<OrderWide> orderWideWithUserDStream = AsyncDataStream.unorderedWait(
                orderWideDStream,
                //自定义的维度关联的函数,并且输入处理的表名
                new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        System.out.println("进入到获取key方法"+orderWide.getUser_id().toString());
                        return orderWide.getUser_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        System.out.println("进入到进行join方法");
                        //进行join，就是把查出来的数据set到宽表中
                        //需要把他的出生日期转换为年龄,所以要生成一个格式化器
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
                        //获取日期
                        String birthday = dimInfoJsonObj.getString("BIRTHDAY");
                        //格式化日期格式
                        Date birthdayDate = sdf.parse(birthday);
                        long birthdaytime = birthdayDate.getTime();
                        long nowTime = System.currentTimeMillis();
                        //计算年龄
                        Long age = (nowTime - birthdaytime) / 1000L / 60L / 60L / 24L / 365L;
                        System.out.println("获取到的年龄为@:"+age);
                        System.out.println("获取到的性别为@:"+dimInfoJsonObj.getString("GENDER"));
                        //将查询的数据获取处理后放入到订单宽表中
                        //将用户性别放进去，宽表，用户属性 user_age=null, user_gender=null
                        orderWide.setUser_gender(dimInfoJsonObj.getString("GENDER"));
                        //年龄
                        orderWide.setUser_age(age.intValue());


                    }
                },
                //超时时间
                60,
                TimeUnit.SECONDS
        );

        //测试宽表中是是否有 age和 性别，不为null;
       //orderWideWithUserDStream.print(">>关联数据后->age.gender");

       //TODO 8 应用异步io，省份维度进行关联,也是将查询到的维度表和上一天订单宽表进行join
        SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDStream = AsyncDataStream.unorderedWait(
                //输入的流
                orderWideWithUserDStream,
                //自定义维度关联的函数,
                new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {

                    @Override
                    public String getKey(OrderWide orderWide) {
                        return orderWide.getProvince_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        //根据宽表中的null值，需要获取，4个属性，
                        orderWide.setProvince_name(dimInfoJsonObj.getString("NAME"));
                        orderWide.setProvince_area_code(dimInfoJsonObj.getString("AREA_CODE"));
                        orderWide.setProvince_iso_code(dimInfoJsonObj.getString("ISO_CODE"));
                        orderWide.setProvince_3166_2_code(dimInfoJsonObj.getString("ISO_3166_2"));
                    }

                },
                60,
                TimeUnit.SECONDS
        );

        //orderWideWithProvinceDStream.print("现在是关联了地区表>>>>地区数据不为null");
        //TODO 9 异步io,关联了sku维度
        SingleOutputStreamOperator<OrderWide> orderWideWithSkuDStream = AsyncDataStream.unorderedWait(
                orderWideWithProvinceDStream,
                new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return orderWide.getSku_id().toString();
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        //查询维度数据中的属性，放到宽表中
                        orderWide.setSku_name(dimInfoJsonObj.getString("SKU_NAME"));
                        orderWide.setSpu_id(dimInfoJsonObj.getLong("SPU_ID"));
                        orderWide.setTm_id(dimInfoJsonObj.getLong("TM_ID"));
                        orderWide.setCategory3_id(dimInfoJsonObj.getLong("CATEGORY3_ID"));
                    }
                },
                60,
                TimeUnit.SECONDS
        );
        //orderWideWithSkuDStream.print("关联了sku的属性");
        //TODO 10 关联关联SPU商品维度
        SingleOutputStreamOperator<OrderWide> orderWideWithSpuDStream = AsyncDataStream.unorderedWait(
                orderWideWithSkuDStream,
                new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getSpu_id());
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        orderWide.setSpu_name(dimInfoJsonObj.getString("SPU_NAME"));
                    }
                },
                60,
                TimeUnit.SECONDS
        );
        //orderWideWithSpuDStream.print("关联spu的维度:name!=null>>");

         //TODO 11.关联品类维度
        SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DStream = AsyncDataStream.unorderedWait(
                orderWideWithSpuDStream,
                new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getCategory3_id());
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        orderWide.setCategory3_name(dimInfoJsonObj.getString("NAME"));
                    }
                },
                60,
                TimeUnit.SECONDS
        );
        //orderWideWithCategory3DStream.print("品类维度关联name!=null>>>");
        //TODO 12.关联品牌维度
        SingleOutputStreamOperator<OrderWide> orderWideWithTrademarkDStream = AsyncDataStream.unorderedWait(
                orderWideWithCategory3DStream,
                new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
                    @Override
                    public String getKey(OrderWide orderWide) {
                        return String.valueOf(orderWide.getTm_id());
                    }

                    @Override
                    public void join(OrderWide orderWide, JSONObject dimInfoJsonObj) throws ParseException {
                        orderWide.setTm_name(dimInfoJsonObj.getString("TM_NAME"));
                    }
                },
                60,
                TimeUnit.SECONDS
        );

        orderWideWithTrademarkDStream.print("品牌表属性！=null>>>>");

        //TODO 13.将订单宽表数据写回到kafka的dwm_order_wide
        String orderWideSinkTopic = "dwm_order_wide";
        orderWideWithTrademarkDStream
                .map(OrderWide->JSONObject.toJSONString(OrderWide))
                .addSink(MyKafkaPro.getFlinkKafkaProducer(orderWideSinkTopic));


        env.execute();
    }
}
